package com.iteaj.iot.client;

import com.iteaj.iot.IotCoreConfiguration;
import com.iteaj.iot.IotThreadManager;
import com.iteaj.iot.client.codec.ClientProtocolEncoder;
import com.iteaj.iot.client.handle.ClientServiceHandler;
import com.iteaj.iot.client.protocol.ClientSocketProtocol;
import com.iteaj.iot.ProtocolException;
import com.iteaj.iot.codec.IotMessageDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static com.iteaj.iot.CoreConst.*;

/**
 * client socket
 * @see TcpSocketClient
 * @see UdpSocketClient
 */
public abstract class SocketClient implements IotClient<NioEventLoopGroup> {

    private Channel channel;
    private Bootstrap bootstrap;
    private ClientConnectProperties config;
    private ClientComponent clientComponent;

    protected Logger logger = LoggerFactory.getLogger(getClass());

    public SocketClient(ClientComponent clientComponent, ClientConnectProperties config) {
        this.config = config;
        this.clientComponent = clientComponent;
        if(this.config == null) {
            throw new IllegalArgumentException("未指定连接配置[ConnectProperties]");
        }
    }

    public void init(NioEventLoopGroup clientGroup) {
        this.bootstrap = new Bootstrap().group(clientGroup)
                .channel(channel()).handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        SocketClient.this.channel = channel;
                        ChannelPipeline pipeline = channel.pipeline();

                        // 设置编解码器
                        ChannelInboundHandler decoder = createProtocolDecoder();
                        if(decoder instanceof IotMessageDecoder) {
                            Class messageClass = getClientComponent().getMessageClass();
                            if(((IotMessageDecoder) decoder).getMessageClass() == null) {
                                ((IotMessageDecoder) decoder).setMessageClass(messageClass);
                            }
                        }

                        // 设置当前客户端连接的服务器配置信息
                        channel.attr(CLIENT_KEY).set(SocketClient.this.getConfig());

                        // 设置iot客户端编解码器
                        pipeline.addFirst(CLIENT_DECODER_HANDLER, decoder);
                        pipeline.addFirst(CLIENT_ENCODER_HANDLER, createProtocolEncoder());

                        // 自定义处理器
                        SocketClient.this.doInitChannel(channel);

                        // 业务处理器新增到最后
                        pipeline.addLast(CLIENT_SERVICE_HANDLER, new ClientServiceHandler(getClientComponent()));
                    }
                });

        this.doInitOptions(this.bootstrap);
    }

    @Override
    public void connect(Consumer<?> consumer, long timeout) {
        this.doConnect(consumer == null ? (future) -> {
            ClientConnectProperties config = this.channel.attr(CLIENT_KEY).get();
            if(future.isSuccess() && config != null) {
                // 连接成功必须保存, 否则会出现获取不到客户端的情况
                clientComponent.addClient(config, this);

                if (logger.isDebugEnabled()) {
                    logger.debug("客户端({}) 连接服务器成功 - 远程主机 {}:{}"
                            , getClientComponent().getName(), this.getHost(), this.getPort());
                }
            }
        } : (Consumer<ChannelFuture>) consumer, timeout);
    }

    /**
     * 断开连接
     */
    public void disconnect(boolean remove) {
        this.channel.disconnect().addListener(future -> {
            if(future.isSuccess()) {
                disconnectSuccessCall(remove); // 移除客户端处理
                if(logger.isInfoEnabled()) {
                    logger.info("客户端({}) 关闭客户端(成功) - 远程地址：{}", getName(), getConfig());
                }
            } else {
                logger.error("客户端({}) 关闭客户端(失败) - 远程地址：{}", getName(), getConfig(), future.cause());
            }
        });
    }

    /**
     * 断线成功后的处理
     * 默认的处理方式：remove=true 移除 remove=false 重连
     */
    protected void disconnectSuccessCall(boolean remove) {
        if(remove) { // 移除
            getClientComponent().removeClient(getConfig());
        } else { // 重连
            reconnection(this.channel);
        }
    }

    protected void doInitOptions(Bootstrap bootstrap) {
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    }

    protected abstract Class<? extends Channel> channel();

    /**
     * 连接配置
     * @return
     */
    public ClientConnectProperties getConfig() {
        return this.config;
    }

    @Override
    public int getPort() {
        return getConfig().getPort();
    }

    @Override
    public String getHost() {
        return getConfig().getHost();
    }

    /**
     * 真正连接服务器的方法
     * @see MultiClientManager#addClient(Object, IotClient) <br>
     *     注意：连接成功后需要加入到客户端管理器
     *
     * @param consumer 可以自定义连接成功或的操作
     * @param timeout 指定连接超时时间
     */
    protected void doConnect(Consumer<ChannelFuture> consumer, long timeout) {
        try {
            final ChannelFuture listener = this.bootstrap.connect(getHost(), getPort()).addListener(future -> {
                try {
                    consumer.accept((ChannelFuture) future);
                } finally {
                    if (future.isSuccess()) {
                        this.successCallback((ChannelFuture) future);
                    } else {
                        this.reconnection(((ChannelFuture) future).channel());
                    }
                }
            });

            if(timeout > 0) {
                listener.await(timeout, TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            logger.error("客户端({}) 连接异常 - 远程主机: {}:{}", getClientComponent().getName(), this.getHost(), this.getPort(), e);
        }
    }

    protected void connectLogger(ChannelFuture future) {
        if (logger.isDebugEnabled() && future.isSuccess()) {
            logger.debug("客户端({}) 连接服务器成功 - 远程主机 {}:{}"
                    , getClientComponent().getName(), this.getHost(), this.getPort());
        } else if (!future.isSuccess()) {
            Throwable cause = future.cause();
            logger.error("客户端({}) 连接服务器失败 - 远程主机 {}:{}", getClientComponent().getName()
                    , getConfig().getReconnect(), this.getHost(), this.getPort(), cause);
        }
    }

    /**
     * 断线重连
     * @see ClientConnectProperties#getReconnect() 重连时间
     * @param channel
     */
    public void reconnection(Channel channel) {
        long reconnectTime = getConfig().getReconnect();
        // 指定了重连时间并且客户端未激活
        if(reconnectTime > 0 && !this.getChannel().isActive()) {
            logger.warn("客户端({}) 断线重连 - 等待重连时间：{}(s) - 远程主机 {}:{}"
                    , getClientComponent().getName(), reconnectTime, this.getHost(), this.getPort());
            IotThreadManager.instance().getDeviceManageEventExecutor().schedule(() -> {
                this.connect((future) -> {
                    final ChannelFuture channelFuture = (ChannelFuture) future;
                    if(channelFuture.isSuccess()) {
                        logger.info("客户端({}) 重连成功 - 远程主机 {}:{}"
                                , getClientComponent().getName(), reconnectTime, this.getHost(), this.getPort());
                    } else {
                        logger.error("客户端({}) 重连失败 - 远程主机 {}:{}"
                                , getClientComponent().getName(), reconnectTime
                                , this.getHost(), this.getPort(), channelFuture.cause());
                    }
                }, 2000);
            }, reconnectTime, TimeUnit.SECONDS);
        }
    }

    /**
     * 同步链接
     * @see #successCallback(ChannelFuture) 将不会被调用
     * @param timeout
     * @return
     */
    public void doConnect(long timeout) throws ProtocolException {
        try {
            boolean await = this.getBootstrap().connect().sync().await(timeout, TimeUnit.SECONDS);
            if(!await) throw new ProtocolException("链接超时("+getClientComponent().getName()+") - " + getHost() + ":" + getPort());

            logger.info("客户端({}) 连接服务器成功 - 远程主机: {}:{}", getClientComponent().getName(), getHost(), getPort());
        } catch (InterruptedException e) {
            throw new ProtocolException("连接中断("+getClientComponent().getName()+") - " + getHost() + ":" + getPort(), e);
        } catch (Exception e) {
            Throwable cause = e.getCause();
            if(cause instanceof ConnectException) {
                throw new ProtocolException("服务端拒绝连接", cause);
            } else if(cause instanceof NoRouteToHostException) {
                throw new ProtocolException(e.getMessage(), cause);
            } else {
                throw new ProtocolException("连接服务器失败", e);
            }
        }
    }

    /**
     * 写出协议报文
     * @see ClientProtocolEncoder 协议编码器, 此处是真正写出报文的地方
     * @param clientProtocol
     * @return
     */
    public ChannelFuture writeAndFlush(ClientSocketProtocol clientProtocol) {
        if(!this.getChannel().isActive()) {
            final ChannelPromise promise = this.getChannel().newPromise();
            // 尝试重连
            connect((future) -> {
                final ChannelFuture channelFuture = (ChannelFuture) future;
                // 重连成功后写出报文
                if(channelFuture.isSuccess()) {
                    this.getChannel().writeAndFlush(clientProtocol).addListener(future1 -> {
                        if(future1.isSuccess()) {
                            promise.setSuccess(); // 发送成功
                        } else {
                            promise.setFailure(future1.cause());
                        }
                    });
                } else {
                    promise.setFailure(channelFuture.cause());
                }
            }, 2000);

            return promise;
        } else if(this.channel.isWritable()){
            return this.channel.writeAndFlush(clientProtocol);
        } else {
            return this.channel.newFailedFuture(new UnWritableProtocolException(clientProtocol));
        }
    }

    protected void doInitChannel(Channel channel) { }

    /**
     * 连接成功后的回调
     * @param future
     */
    protected void successCallback(ChannelFuture future) { }

    /**
     * 创建客户端socket解码器
     * @see io.netty.handler.codec.ByteToMessageDecoder 此对象的子类不能使用 {@link io.netty.channel.ChannelHandler.Sharable}
     * @return
     */
    protected abstract ChannelInboundHandler createProtocolDecoder();

    /**
     * 创建socket编码器
     * @return 默认使用iot框架实现的编码器 {@link ClientProtocolEncoder}
     */
    protected ChannelOutboundHandlerAdapter createProtocolEncoder() {
        return new ClientProtocolEncoder(getClientComponent());
    }

    public ClientComponent getClientComponent() {
        return clientComponent;
    }

    public void setClientComponent(ClientComponent clientComponent) {
        this.clientComponent = clientComponent;
    }

    public String getName() {
        return this.clientComponent.getName();
    }

    public Bootstrap getBootstrap() {
        return bootstrap;
    }

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }
}
